#include "dialogkafka.h"
#include "ui_dialogkafka.h"
#include <QDateTime>
DialogKafka * instance = nullptr;

void uiprintf (const char * format,...);

DialogKafka::DialogKafka(QWidget *parent)
	: QDialog(parent)
	, ui(new Ui::DialogKafka)
	, m_pMsgMod(new QStandardItemModel(this))
{
	ui->setupUi(this);
	instance = this;
	KafkaClient::cbprintf = uiprintf;
	ui->listView_msg->setModel(m_pMsgMod);
	connect(this,&DialogKafka::sig_msg,this,&DialogKafka::slot_msg,Qt::QueuedConnection);
}

DialogKafka::~DialogKafka()
{
	delete ui;
	KafkaClient::cbprintf = csprintf;
	instance = nullptr;
}


void DialogKafka::on_pushButton_start_clicked()
{
	if (consumer)
	{
		killTimer(m_nTimerID);
		m_nTimerID = -1;

		consumer->stop();
		m_runthread->join();

		delete consumer;
		consumer = nullptr;

		delete  producer;
		producer = nullptr;

		delete m_runthread;
		m_runthread = nullptr;

		ui->pushButton_start->setText("Start");

	}
	else
	{
		producer = new kafka_producer(
					ui->lineEdit_brokers->text().toStdString()
					,ui->lineEdit_topic->text().toStdString()
					);

		std::vector<std::string> topics;
		topics.push_back(ui->lineEdit_topic->text().toStdString());
		consumer = new kafka_consumer(
					ui->lineEdit_brokers->text().toStdString(),
					topics,
					ui->lineEdit_group->text().toStdString());

		m_runthread = new std::thread([&]()->void{
			consumer->run([&](rd_kafka_message_t * rkm)->void{
				if (rkm)
				{
					/* Proper message. */
					cbprintf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
						   rd_kafka_topic_name(rkm->rkt), rkm->partition,
						   rkm->offset);

					/* Print the message key. */
					if (rkm->key )
						cbprintf(" Key: %.*s\n", (int)rkm->key_len,
							   (const char *)rkm->key);
					/* Print the message value/payload. */
					if (rkm->payload)
						cbprintf(" Value: (%d bytes)\n", (int)rkm->len);
				}


			});
		});

		m_nTimerID = startTimer(500);
		ui->pushButton_start->setText("Stop");
	}
}

void DialogKafka::shootMsg(QString msg)
{
	emit sig_msg(msg);
}

void DialogKafka::slot_msg(QString m)
{
	QString prefix = QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss") + ">";
	QString line = prefix + m;
	m_pMsgMod->appendRow(new QStandardItem(line));
	if (m_pMsgMod->rowCount()>128)
		m_pMsgMod->removeRows(0,m_pMsgMod->rowCount()-128);
}

void DialogKafka::timerEvent(QTimerEvent * evt)
{
	if (evt->timerId()==m_nTimerID && producer)
	{
		char test[1024];
		for (int i=0;i<1024;++i)
			test[i] = rand();
		std::string k = ui->lineEdit_key->text().toStdString();
		if (k.length())
			producer->write(test,1024,10,k.c_str(),k.size());
		else
			producer->write(test,1024);
	}
}

void uiprintf (const char * format,...)
{
	va_list args;
	va_start(args, format);
	if (instance)
	{
		char buf[1024];
		vsnprintf(buf,1024,format,args);
		buf[1023] = 0;
		instance->shootMsg(QString(buf).trimmed());
	}
	else
		vfprintf(stderr,format, args);
	va_end(args);
}
